#include <boost/thread/mutex.hpp>
#include <zlib.h>
#include <boost/format.hpp>
#include <boost/bind.hpp>

#include "ClientManager.h"
#include "trace_worker.h"
#include "Base64.h"
#include "conn-pool/ConnPool.h"
#include "JsonHelper.h"
#include "IweChenCfg.h"

static boost::mutex g_insMutexCalc;
CClientManager* CClientManager::_instance = NULL;

CClientManager* CClientManager::instance() 
{	
	if (NULL == _instance)
	{
		boost::unique_lock<boost::mutex> guardMutex(g_insMutexCalc);
		if (NULL == _instance)
		{
			_instance = new CClientManager;
		}
	}
	return _instance;
}


CClientManager::CClientManager()
:m_serialNumber(0)
{
    InitPublisher();
    m_methodMap["ping"] = boost::bind(&CClientManager::Ping, this, _1, _2, _3, _4, _5);
    m_methodMap["websocket"] = boost::bind(&CClientManager::Websocket, this, _1, _2, _3, _4, _5);
    m_methodMap["webview"] = boost::bind(&CClientManager::Webview, this, _1, _2, _3, _4, _5);

    ConnPoolInit();
}

CClientManager::~CClientManager()
{
}

void CClientManager::ConnPoolInit()
{
    const rapidjson::Value &defaultValue = CWeChenCfg::instance()->Cfg()["THRIFT"]["default"];
    CThriftConnPool::instance()->Init(defaultValue);

    const rapidjson::Value &serverValue = CWeChenCfg::instance()->Cfg()["THRIFT"]["server"];
    for (unsigned int serverIndex=0; serverIndex<serverValue.Size(); ++serverIndex)
    {
        const rapidjson::Value &configValue = serverValue[serverIndex];
        boost::shared_ptr<CThriftClient> thriftClient = boost::shared_ptr<CThriftClient>(new CThriftClient(configValue));
        RegisterMethods(configValue["methods"], thriftClient);
        m_thriftClientVector.push_back(thriftClient);
    }
}

bool CClientManager::RegisterMethod(const char *methodName, Method method)
{
    m_methodMap[methodName] = method;
    return true;
}

void CClientManager::RegisterMethods(const rapidjson::Value &methodsValue, boost::shared_ptr<CThriftClient> &thriftClient)
{   trace_worker();
    for (unsigned int methodIndex=0; methodIndex<methodsValue.Size(); ++methodIndex)
    {
        const char *methodName = methodsValue[methodIndex].GetString();
        trace_printf("methodName  %s", methodName);
        RegisterMethod(methodName, boost::bind(&CThriftClient::thriftMethod, thriftClient, _1, _2, _3, _4, _5));
    }
}

void CClientManager::InitPublisher()
{   trace_worker();
    const rapidjson::Value &redisValue = CWeChenCfg::instance()->Cfg()["REDIS"];
    std::string redisHost = redisValue["host"].GetString();
	int redisPort = redisValue["port"].GetInt();
    m_publisher.connect(redisHost, redisPort);
}

void CClientManager::CleanStr(const std::string& srcStr, const std::string cleanChars, std::string& dstStr)
{
    char *strBuffer = (char *)malloc(srcStr.size() + 1);
    bool isValueChar = true;
    int dstStrLen = 0;
    for (unsigned int i=0; i<srcStr.size(); ++i)
    {
        isValueChar = true;
        for (unsigned int cleanCharsIndex=0; cleanCharsIndex<cleanChars.size(); ++cleanCharsIndex)
        {
            if (cleanChars[cleanCharsIndex] == srcStr[i])
            {
                isValueChar = false;
                break;
            }
        }
        if (isValueChar == true)
        {
            strBuffer[dstStrLen++] = srcStr[i];
        }
    }
    strBuffer[dstStrLen] = '\0';
    dstStr = strBuffer;
    free(strBuffer);
    
}

std::string &CClientManager::Compress(const std::string &srcData, std::string &dstData)
{   
    unsigned long dstBufferLen = srcData.size() + 64;
    unsigned char *dstBuffer = (unsigned char *)malloc(dstBufferLen);
    
    ::compress(dstBuffer, &dstBufferLen, (const Bytef*)srcData.c_str(), (uLong)srcData.size());
    dstData.assign((const char *)dstBuffer, dstBufferLen);
    free(dstBuffer);
    return dstData;
}


bool CClientManager::thriftMethod(rapidjson::Document& returnDocument, const std::string& method, const rapidjson::Value& reqArgsValue)
{
    rapidjson::StringBuffer buffer;
    std::string openId = reqArgsValue.HasMember("openId") ? reqArgsValue["openId"].GetString() : "";

    std::string _return;
    if (thriftMethod(_return, openId, method, CJsonHelper::toString(reqArgsValue, buffer)) == false)
    {
        return false;
    }
    if (_return.size() <= 0 || returnDocument.Parse(_return.c_str()).HasParseError())
    {
        return false;
    }
    return true;
}

bool CClientManager::thriftMethod(std::string& _return, const std::string &openId, const std::string& method, const std::string& reqArgs)
{
    Method &methodfunction = m_methodMap[method];
    if (methodfunction)
    {
        return methodfunction(_return, openId, method, reqArgs, false);
    }
    return false;
}

bool CClientManager::thriftMethodSyn(rapidjson::Document& returnDocument, const std::string& method, rapidjson::Value& reqArgsValue, rapidjson::Document &document)
{   trace_worker();
    rapidjson::StringBuffer buffer;
    std::string openId = reqArgsValue.HasMember("openId") ? reqArgsValue["openId"].GetString() : "";
    const int serialNumber = ++m_serialNumber;

    reqArgsValue.AddMember("serialNumber", serialNumber, document.GetAllocator());        

    std::string _return;
    if (thriftMethodSyn(_return, openId, method, CJsonHelper::toString(reqArgsValue, buffer)) == false)
    {
        return false;
    }
    if (_return.size() <= 0 || returnDocument.Parse(_return.c_str()).HasParseError()
        || !returnDocument.HasMember("serialNumber")
        || returnDocument["serialNumber"].GetInt() != serialNumber)
    {
        return false;
    }
    return true;
}

bool CClientManager::thriftMethodSyn(std::string& _return, const std::string &openId, const std::string& method, const std::string& reqArgs)
{   trace_worker();
    trace_printf("openId.c_str(), method.c_str(), reqArgs.c_str()  %s  %s  %s", openId.c_str(), method.c_str(), reqArgs.c_str());
    Method &methodfunction = m_methodMap[method];
    if (methodfunction)
    {
        bool bRet = methodfunction(_return, openId, method, reqArgs, true);
        trace_printf("bRet, _return.c_str()  %d, %s", bRet, _return.c_str());
        return bRet;
    }
    return false;
}

bool CClientManager::SendWSData(rapidjson::Document &document, rapidjson::Value &dataValue)
{
    if (!dataValue.HasMember("openId") || !dataValue.HasMember("content"))
    {
        return false;
    }
    const std::string &openId = dataValue["openId"].GetString();
    dataValue.RemoveMember("openId");
    
    boost::unique_lock<boost::mutex> uniqueLock(m_publisherMutex);  
    if (m_publisher.numSub(openId) <= 0)
    {
        return false;
    }
    rapidjson::StringBuffer buffer;
    return m_publisher.publish(openId, std::string("message:") + CJsonHelper::toString(CompressData(document, dataValue), buffer));
}

bool CClientManager::IsOnLine(const std::string &openId)
{
    std::vector<int> onLineVector;
    std::vector<std::string> openIdVector;
    openIdVector.push_back(openId);
    IsOnLines(openIdVector, onLineVector);
    return onLineVector[0] != 0;
}

bool CClientManager::IsOnLines(const std::vector<std::string> &openIdVector, std::vector<int> &onLineVector)
{
    onLineVector.clear();
    boost::unique_lock<boost::mutex> uniqueLock(m_publisherMutex);
    for (unsigned int openIdVectorIndex=0; openIdVectorIndex<openIdVector.size(); ++openIdVectorIndex)
    {
        m_publisher.numSub(openIdVector[openIdVectorIndex]) <= 0 ? onLineVector.push_back(0) : onLineVector.push_back(1);
    }
    return true;
}

rapidjson::Value &CClientManager::CompressData(rapidjson::Document &document, rapidjson::Value &dataValue)
{
    bool compress = false;
    if (dataValue.HasMember("compress"))
    {
        compress = dataValue["compress"].GetBool();
    }
    if (compress == true)
    {
        rapidjson::StringBuffer buffer;
        std::string dstContent;
        const std::string &srcContent = CJsonHelper::toString(dataValue["content"], buffer);
        
        dstContent = Compress(srcContent, dstContent);
        std::string base64Str;
        CBase64 base64;
        base64.Encode((const unsigned char *)dstContent.c_str(), dstContent.size(), base64Str);
        CleanStr(base64Str, "\r\n", dstContent);

        dataValue.AddMember("content", rapidjson::Value(dstContent.c_str(), document.GetAllocator()), document.GetAllocator());
    }
    
    return dataValue;
}

bool CClientManager::Ping(std::string& _return, const std::string &openId, const std::string &method, const std::string &content, bool isSyn)
{
    _return = "pong";
    return true;
}

bool CClientManager::Websocket(std::string& _return, const std::string& openId, const std::string& method, const std::string& content, bool isSyn)
{   trace_worker();
    trace_printf("content.c_str()  %s", content.c_str());
    rapidjson::Document tmpDocument;
    if (tmpDocument.Parse(content.c_str()).HasParseError()
        || !tmpDocument.HasMember("code")
        || !tmpDocument.HasMember("url"))
    {
        return false;
    }
    
    std::string url = tmpDocument["url"].GetString();
    std::string wsUrl = url.replace(url.find("https"), 5, "wss") + "/" + tmpDocument["code"].GetString();

    _return = (boost::format("{\"url\":\"%s\"}") % wsUrl.c_str()).str();
    trace_printf("_return.c_str()  %s", _return.c_str());
    return true;
}

bool CClientManager::Webview(std::string& _return, const std::string& openId, const std::string& method, const std::string& content, bool isSyn)
{   trace_worker();
    _return = (boost::format("<meta http-equiv='refresh' content='0;URL=%s'>") % content.c_str()).str();
    return true;
}


